First, let's get the interesting stuff out of the GRC file. There's a bit of trial-and-error here as we figure out what's what in the file.
from xml.dom import minidom
xmldoc = minidom.parse('/home/jbm/src/gnuradio/gr-uhd/examples/grc/uhd_dpsk_mod.grc')
import xml.etree.ElementTree as ET # for generation
from IPython.display import SVG, display
import math # because everyone needs a little trig in their life
n = xmldoc.childNodes[0]
n = xmldoc.getElementsByTagName("flow_graph")[0]
n.getElementsByTagName("block")[0]
m = n.getElementsByTagName("key")[0]
l = m.childNodes[0]
l.data
math.atan2(7.0, -94.0)
class GRCVBlock:
def __init__(self, el):
self.params = {}
self.key = el.getElementsByTagName("key")[0].childNodes[0].data
for p in el.getElementsByTagName("param"):
k = p.getElementsByTagName("key")[0].childNodes[0].data
try:
v = p.getElementsByTagName("value")[0].childNodes[0].data
except IndexError:
v = ""
self.params[k] = v
self.connections = [] # List of (direction, my_port, remote_id, remote_port)
self.port_coords = {}
def get_id(self):
return self.params["id"]
def connect(self, direction, my_port, remote_id, remote_port):
candidate_row = (direction, my_port, remote_id, remote_port)
for r in self.connections:
if r == candidate_row:
return
self.connections.append(candidate_row)
def ports(self, direction):
retval = set()
for d, my_port, remote_id, remote_port in self.connections:
if d == direction:
retval.add(my_port)
return list(retval)
def coordinates(self):
"Extract the x,y and rotation for this block"
raw_xy = self.params["_coordinate"]
x,y = map(float, raw_xy[1:-1].split(","))
theta = float(self.params["_rotation"])
return (x,y,theta)
def coordinates_port(self, direction, port):
return self.port_coords["%s,%s" % (direction,port)]
def enabled(self):
return self.params["_enabled"]
def attach_svg(self, parent):
(x,y,t) = self.coordinates()
t_rad = t*math.pi/180.0
header_text = self.get_id()
if self.key == "note":
header_text = "Note: %s" % self.params["note"]
ports_in = self.ports("in")
ports_out = self.ports("out")
max_ports = max(map(len, [ports_in, ports_out]))
PORT_WIDTH = 10
PORT_HEIGHT = 20
PORT_GAP = 2
HEADER_HEIGHT = 22
FOOTER_HEIGHT = HEADER_HEIGHT/2
h = HEADER_HEIGHT + (PORT_HEIGHT+PORT_GAP)*max_ports + FOOTER_HEIGHT
w = 10 + 7 * len(header_text)
x_c = x+w/2
y_c = y+h/2
palette = {
"options": ("grey", 0.1),
"variable": ("#8888aa", 0.2),
"variable_slider": ("#8888aa", 0.3),
"import": ("#aa8888", 0.3),
"notebook": ("#888888", 0.5),
"note": ("#ff4444", 0.4),
"_": ("#aa8888", 0.2)
}
palette_entry = palette["_"]
if self.key in palette:
palette_entry = palette[self.key]
blk = ET.SubElement(parent,
"g")
blk.set("class","grc_block")
if abs(t) > 0.0001:
blk.set("transform", "rotate(%f %f %f)" % (t, x_c, y_c))
#################
# Base rectangle
rect = ET.SubElement(blk,
"rect",
x=str(x),
y=str(y),
height=str(h),
width=str(w),
style="fill: %s; opacity: %f;" % palette_entry)
rect.set("class", "key_%s" % self.key)
#################
# Header text
txt = ET.SubElement(blk,
"text",
x=str(x+15),
y=str(y+HEADER_HEIGHT-5),
height=str(h),
width=str(w),
style="text-weight: bold;")
txt.text = header_text
#############
# I/O Ports
ports_in.sort()
for i,p in enumerate(ports_in):
y_p = y + HEADER_HEIGHT + (i+1)*(PORT_GAP) + i*PORT_HEIGHT
x_p = x - PORT_WIDTH
rect_p = ET.SubElement(blk,
"rect",
x=str(x_p),
y=str(y_p),
width=str(PORT_WIDTH),
height=str(PORT_HEIGHT),
style="fill:grey; opacity: 0.5;")
x_logical = x_p+PORT_WIDTH/2
x_rel = x_logical-x_c
y_logical = y_p+PORT_HEIGHT/2
y_rel = y_logical - y_c
l = math.sqrt(x_rel*x_rel + y_rel*y_rel)
t0 = math.atan2(y_rel, x_rel)
t_act = t0+t_rad
x_proj = x_c + l * math.cos(t_act)
y_proj = y_c + l * math.sin(t_act)
self.port_coords[ "in,%s" % p ] = (x_proj,y_proj)
# ET.SubElement(parent, "text", x=str(x_proj),y=str(y_proj)).text="i"
ports_out.sort()
for i,p in enumerate(ports_out):
y_p = y + HEADER_HEIGHT + (i+1)*(PORT_GAP) + i*PORT_HEIGHT
x_p = x + w
rect_p = ET.SubElement(blk,
"rect",
x=str(x_p),
y=str(y_p),
width=str(PORT_WIDTH),
height=str(PORT_HEIGHT),
style="fill:grey; opacity: 0.5;")
x_logical = x_p+PORT_WIDTH/2
x_rel = x_logical-x_c
y_logical = y_p+PORT_HEIGHT/2
y_rel = y_logical - y_c
l = math.sqrt(x_rel*x_rel + y_rel*y_rel)
t0 = math.atan2(y_rel, x_rel)
t_act = t0+t_rad
x_proj = x_c + l * math.cos(t_act)
y_proj = y_c + l * math.sin(t_act)
# ET.SubElement(parent, "text", x=str(x_proj),y=str(y_proj)).text="o"
self.port_coords[ "out,%s" % p ] = (x_proj,y_proj)
all_blocks = map(GRCVBlock, n.getElementsByTagName("block"))
try:
# http://nbviewer.ipython.org/gist/rpmuller/5666810
from IPython.display import SVG
import xml.etree.ElementTree as ET
#fg = GRCVFlowGraph("/home/jbm/src/gnuradio-grc-examples/receiver/apt_play.grc")
fg = GRCVFlowGraph("rds_rx_janked.grc")
display(SVG(fg.svg()))
except Exception, e:
print str(e)
print "You're probably fine; this is premature to run this. It's here to make tweaking easier."
[ (b.key, b.get_id(), b.coordinates() ) for b in all_blocks ]
from collections import defaultdict
paramcounts = defaultdict(int)
for b in all_blocks:
for k in b.params:
paramcounts[k] += 1
all_params = paramcounts.keys()
all_params.sort(key=lambda s: -1*paramcounts[s])
for p in all_params[0:7]:
print "%s === > %d" % (p, paramcounts[p])
class GRCVConnection:
def __init__(self, el):
def gv(x):
try:
return el.getElementsByTagName(x)[0].childNodes[0].data
except IndexError:
return None
self.source = (gv("source_block_id"), gv("source_key"))
self.sink = (gv("sink_block_id"), gv("sink_key"))
all_conns = map(GRCVConnection, n.getElementsByTagName("connection"))
len(all_conns)
len("xlate_bandwidth")
[ (c.source, c.sink) for c in all_conns ]
import xml.etree.ElementTree as ET
from xml.dom import minidom
class GRCVFlowGraph:
def __init__(self, path):
xmldoc = minidom.parse(path)
fg = xmldoc.getElementsByTagName("flow_graph")[0]
# XXX TODO Skip the timestamp
self.blocks = {}
for el in fg.getElementsByTagName("block"):
b = GRCVBlock(el)
self.blocks[b.get_id()] = b
self.connections = map(GRCVConnection,
fg.getElementsByTagName("connection"))
for c in self.connections:
(src_id, src_port) = c.source
(snk_id, snk_port) = c.sink
self.blocks[src_id].connect("out", src_port, snk_id, snk_port)
self.blocks[snk_id].connect("in", snk_port, src_id, src_port)
def bounds(self):
coords = [ b.coordinates() for b in self.blocks.values() ]
xmin = min([c[0] for c in coords])
xmax = max([c[0] for c in coords])
ymin = min([c[1] for c in coords])
ymax = max([c[1] for c in coords])
return ( (xmin,ymin), (xmax,ymax) )
def svg(self):
bbox = self.bounds()
# Just take the maximum plus fudge factor for now
(width,height) = bbox[1]
width *= 1.20
height *= 1.20
svg = ET.Element('svg',
xmlns="http://www.w3.org/2000/svg",
version="1.1",
height="%f" % height,
width="%f" % width)
defs = ET.SubElement(svg, "defs")
arrow = ET.SubElement(defs, "marker")
toset = {
"id": "markerArrow",
"markerWidth": "9",
"markerHeight": "9",
"refx": "2",
"refy": "6",
"orient": "auto"
}
for k,v in toset.iteritems():
arrow.set(k,v)
ET.SubElement(arrow, "path", d="M2,2 L2,9 L9,6 L2,2", style="fill: black;")
canvas = ET.SubElement(svg, 'g', transform="translate(20 20)")
for b in self.blocks.values():
b.attach_svg(canvas)
for c in self.connections:
(src_id, src_port) = c.source
(snk_id, snk_port) = c.sink
x0,y0 = self.blocks[src_id].coordinates_port("out",src_port)
x1,y1 = self.blocks[snk_id].coordinates_port("in",snk_port)
line = ET.SubElement(canvas,
"line",
x1=str(x0),
y1=str(y0),
x2=str(x1),
y2=str(y1),
stroke="#333333",
style="marker-end: url(#markerArrow);")
line.set("stroke-width", "1.5")
return ET.tostring(svg)
fg = GRCVFlowGraph("rds_rx_janked.grc")
fg.bounds()
fg.svg()[0:100]
# http://nbviewer.ipython.org/gist/rpmuller/5666810
from IPython.display import SVG
import xml.etree.ElementTree as ET
fg = GRCVFlowGraph("rds_rx_janked.grc")
SVG(fg.svg())
fg = GRCVFlowGraph("/home/jbm/src/misc/gr-rds/apps/rds_rx.grc")
SVG(fg.svg())